Lab 12: Spark Streaming

Introduction

In this lab, we're going to look at data streaming with Apache Spark. At the end of the lab, you should be able to:

  • Create a local StreamingContext object.
  • Use Spark to analyse the recent Wikipedia edits stream.

Getting started

Let's start by importing the packages we'll need. This week, we'll need to install the sseclient package so we can connect to the Wikipedia stream. This package is not installed on student vDesktop environments, but you can install it if you're running at home or using Docker by executing the code in the box below:


In [ ]:
!pip install sseclient

Like last week, we're going to use pyspark, a Python package that wraps Apache Spark and makes its functionality available in Python. We'll also use a few of the standard Python libraries - json, socket, threading and time - as well as the sseclient package you just installed to connect to the event stream.

Note: You don't need to understand how these packages are used to connect to the event stream, but the code is below if you're curious.


In [ ]:
import json
import pyspark
import socket
import threading
import time

from pyspark.streaming import StreamingContext
from sseclient import SSEClient

Streaming Wikipedia events

Currently, Spark supports three kinds of streaming connection out of the box:

  1. Apache Kafka
  2. Amazon Kinesis
  3. Apache Flume

While it's possible to connect to other kinds of streams too, we must write our own code to do it and, at present, this is unsupported in Python (although it is possible in Java and Scala). However, Spark also supports streaming data from arbitrary TCP socket endpoints and so we can instead relay the remote data stream to a local socket port to enable Spark to consume it.

The code in the box below connects to the Wikipedia event stream and publishes its content to a local port. While you don't need to understand it to complete the lab, the basic logic is as follows:

  1. Connect to the Wikipedia RecentChanges stream using SSEClient.
  2. Create a local socket connection on port 50000.
  3. When a client (e.g. Spark) connects to the local socket, relay the next available event to it from the event stream.

In [ ]:
def relay():
    events = SSEClient('https://stream.wikimedia.org/v2/stream/recentchange')
    
    s = socket.socket()
    s.bind(('localhost', 50000))
    s.listen(1)
    while True:
        try:
            client, address = s.accept()
            for event in events:
                if event.event == 'message':
                    client.sendall(event.data)
                    break
        except:
            pass
        finally:
            client.close()
    

threading.Thread(target=relay).start()

Streaming analysis

Now that we have our stream relay set up, we can start to analyse its contents. First, let's initialise a SparkContext object, which will represent our connection to the Spark cluster. To do this, we must first specify the URL of the master node to connect to. As we're only running this notebook for demonstration purposes, we can just run the cluster locally, as follows:


In [ ]:
sc = pyspark.SparkContext(master='local[*]')

Next, we create a StreamingContext object, which represents the streaming functionality of our Spark cluster. When we create the context, we must specify a batch duration time (in seconds), to tell Spark how often it should process data from the stream. Let's process the Wikipedia data in batches of one second:


In [ ]:
ssc = StreamingContext(sc, 1)

Using our StreamingContext object, we can create a data stream from our local TCP relay socket with the socketTextStream method:


In [ ]:
stream = ssc.socketTextStream('localhost', 50000)

Even though we've created a data stream, nothing happens! Before Spark starts to consume the stream, we must first define one or more operations to perform on it. Let's count the number of edits made by different users in the last minute:


In [ ]:
users = (
    stream.map(json.loads)                   # Parse the stream data as JSON
          .map(lambda obj: obj['user'])      # Extract the values corresponding to the 'user' key
          .map(lambda user: (user, 1))       # Give each user a count of one
          .window(60)                        # Create a sliding window, sixty seconds in length
          .reduceByKey(lambda a, b: a + b)   # Reduce all key-value pairs in the window by adding values
          .transform(                        # Sort by the largest count
              lambda rdd: rdd.sortBy(lambda kv: kv[1], ascending=False))
          .pprint()                          # Print the results
)

Again, nothing happens! This is because the StreamingContext must be started before the stream is processed by Spark. We can start data streaming using the start method of the StreamingContext and stop it using the stop method. Let's run the stream for two minutes (120 seconds) and then stop:


In [ ]:
ssc.start()

time.sleep(120)

ssc.stop()

As can be seen, Spark counts the number of edits made by each user in the past sixty seconds and emits updates once per second (the original batch duration of the StreamingContext).